【小ネタ】[Amazon Kinesis Data Firehose] Transform source records with AWS Lambda でレコード毎に改行を追加する

【小ネタ】[Amazon Kinesis Data Firehose] Transform source records with AWS Lambda でレコード毎に改行を追加する

Clock Icon2021.08.08

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

IoT事業部の平内(SIN)です。

ストリームに流れるレコードを、そのままS3へ保存するような場合、Amazon Kinesis Data Firehoseが非常に便利に利用されると思います。そして、レコードが、JSONのようなテキスト形式だった場合に、レコード毎の区切りが分かるように改行等を入れて保存すると、利用しやすいかも知れません。

このことから、IoT Coreのルールでは、Amazon Kinesis Data Firehoseへ送る際、改行等のセパレータが指定可能になっています。

今回は、Amazon Kinesis Data Streamsを入力ソースとしたAmazon Kinesis Data Firehoseで、S3へ保存する前に、レコード毎に改行を挿入するLambdaを作成してみました。

レコードデータは、手元のMacからboto3で送信しています。

2 Lambda

(1) コード

Lamnbdaのコードは、以下の通りです。

Lambdaには、複数のレコードが到着しますが、レコード毎にデコードして、改行を追加し、再びエンコードします。Lambdaのレスポンスとしては、受信時に取得したRecordIdresult を付与したものになります。

今回は、改行の挿入だけですが、ここでは、レコードの変換を自由に行うことができます。また、レコードがJSON形式の場合は、一旦、パースして、JSONとしてその内容を扱うことも可能です。


import json
import base64

def lambda_handler(event, context):

    results = []
    records = event["records"]
    for record in records:
        recordId = record["recordId"]
        data = record["data"]
        
        # デコード
        decoded_data = base64.b64decode(data).decode("utf-8")

        # JSONで処理する場合は、jsonへのパースを行う
        #payload = json.loads(decoded_data)  
        #decoded_data = json.dumps(payload)
        
        # 改行を追加する
        decoded_data = decoded_data + '\n';
        
        # エンコード
        data = base64.b64encode(decoded_data.encode())

        results.append({
            "result":"Ok",
            "recordId":recordId,
            "data":data
        })
        
    return {
        "records":results
    }

(2) タイムアウト

Lambdaのタイムアウトは、最低1分としないと、アラートが出てました。

(3) アクセス権

Lambdaには、特にアクセス権を追加する必要はありません。

3 Kinesis Data Firehose

(1) Transform source records with AWS Lambda

Source record transformationEnableとし、作成したLambdaを設定します。

(2) Amazon S3 destination

送り先は、S3バケットとし、今回は、バッファサイズ、1MiB、インターバルは、60secとしています。

4 動作確認

動作確認のために使用したコードは以下の通りです。手元のMacから20レコードほど送信しました。

import json
import random
import datetime
from boto3.session import Session

profile = 'developer'
session = Session(profile_name = profile)
kinesis = session.client('kinesis')

stream_name = 'test'

for i in range(20):
    data = {
        "index": i,
        "timestamp": datetime.datetime.now().isoformat(sep=' ', timespec='milliseconds'),
        "message": "hello"
    }
    try:
        response = kinesis.put_record(
            StreamName = stream_name, 
            PartitionKey = str(random.randrange(0,100)), 
            Data = json.dumps(data)
        )
    except Exception as e:
        print("Exception: {}", e.args)
    print('put_record SequenceNumber:{}'.format(response['SequenceNumber']))

S3バケットに保存されたデータです。

中身を確認すると、各レコードごと改行されていることが確認できます。

5 最後に

今回は、Amazon Kinesis Data Firehoseで、Lambdaによるレコードへの改行追加を行ってみました。

Pythonによる、Transform source records with AWS Lambdaのコードスニペットとして、個人的には重宝しそうです。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.